---
title: 工作流系统概述 | Kastrax 文档
description: Kastrax 中工作流系统的概述，详细说明如何创建、配置和执行工作流。
---

# 工作流系统概述 ✅

Kastrax 工作流系统允许您定义、执行和监控涉及代理和其他组件的复杂操作序列。本指南解释了工作流系统架构以及如何有效地使用它。

## 什么是工作流？ ✅

Kastrax 中的工作流是按特定顺序执行的结构化步骤序列，数据在步骤之间流动。它们使您能够：

- 编排多个代理协作处理复杂任务
- 基于中间结果定义条件执行路径
- 在步骤之间处理和转换数据
- 处理错误和重试
- 监控和跟踪执行进度
- 为常见代理交互创建可重用模式

## 工作流系统架构 ✅

Kastrax 工作流系统由几个关键组件组成：

1. **Workflow**：定义步骤序列的顶级容器
2. **WorkflowStep**：工作流内的单个工作单元
3. **WorkflowContext**：在步骤之间传递的共享状态和数据
4. **WorkflowEngine**：执行工作流的运行时
5. **WorkflowBuilder**：用于创建工作流的 DSL
6. **WorkflowStateStorage**：工作流执行状态的存储

## 创建基本工作流 ✅

Kastrax 提供了用于创建工作流的 DSL：

```kotlin
import ai.kastrax.core.workflow.builder.workflow
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // 创建工作流的代理
    val researchAgent = agent {
        name("研究代理")
        description("一个研究主题的代理")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val writingAgent = agent {
        name("写作代理")
        description("一个基于研究写内容的代理")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    // 创建工作流
    val researchWorkflow = workflow("research-workflow", "研究并撰写文章") {
        // 研究步骤
        step(researchAgent) {
            id = "research"
            name = "研究"
            description = "研究主题"
            variables = mutableMapOf(
                "topic" to variable("$.input.topic")
            )
        }
        
        // 写作步骤
        step(writingAgent) {
            id = "writing"
            name = "写作"
            description = "基于研究撰写文章"
            after("research")
            variables = mutableMapOf(
                "research" to variable("$.steps.research.output.text")
            )
        }
        
        // 定义输出映射
        output {
            "researchResults" from "$.steps.research.output.text"
            "article" from "$.steps.writing.output.text"
            "wordCount" from {
                "$.steps.writing.output.text" to { text ->
                    (text as? String)?.split(" ")?.size ?: 0
                }
            }
        }
    }
    
    // 执行工作流
    val workflowEngine = WorkflowEngine()
    workflowEngine.registerWorkflow("research-workflow", researchWorkflow)
    
    val result = workflowEngine.executeWorkflow(
        workflowId = "research-workflow",
        input = mapOf("topic" to "人工智能")
    )
    
    println("工作流结果: ${result.output}")
}
```

## 工作流组件 ✅

### Workflow ✅

`Workflow` 接口定义了工作流的核心功能：

```kotlin
interface Workflow {
    suspend fun execute(
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): WorkflowResult

    suspend fun streamExecute(
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): Flow<WorkflowStatusUpdate>
}
```

### WorkflowStep ✅

`WorkflowStep` 接口定义了工作流内的单个步骤：

```kotlin
interface WorkflowStep {
    val id: String
    val name: String
    val description: String
    val after: List<String>
    val variables: Map<String, VariableReference>
    val config: StepConfig?
        get() = null
    val condition: (WorkflowContext) -> Boolean
        get() = { true }
    
    suspend fun execute(context: WorkflowContext): WorkflowStepResult
}
```

### WorkflowContext ✅

`WorkflowContext` 类在工作流执行期间保存状态和数据：

```kotlin
data class WorkflowContext(
    val input: Map<String, Any?>,
    val steps: MutableMap<String, WorkflowStepResult> = mutableMapOf(),
    val variables: MutableMap<String, Any?> = mutableMapOf(),
    val runId: String? = null
) {
    fun resolveReference(reference: VariableReference): Any? {
        // 实现...
    }
    
    fun resolveVariables(variables: Map<String, VariableReference>): Map<String, Any?> {
        // 实现...
    }
}
```

### WorkflowEngine ✅

`WorkflowEngine` 类负责执行工作流：

```kotlin
class WorkflowEngine(
    private val stateStorage: WorkflowStateStorage = InMemoryWorkflowStateStorage()
) {
    private val workflows = mutableMapOf<String, Workflow>()
    
    fun registerWorkflow(workflowId: String, workflow: Workflow) {
        workflows[workflowId] = workflow
    }
    
    suspend fun executeWorkflow(
        workflowId: String,
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): WorkflowResult {
        // 实现...
    }
    
    suspend fun streamExecuteWorkflow(
        workflowId: String,
        input: Map<String, Any?>,
        options: WorkflowExecuteOptions = WorkflowExecuteOptions()
    ): Flow<WorkflowStatusUpdate> {
        // 实现...
    }
}
```

## 步骤类型 ✅

Kastrax 提供了几种内置步骤类型：

### 代理步骤 ✅

最常见的步骤类型，使用特定输入执行代理：

```kotlin
step(agent) {
    id = "generate-content"
    name = "生成内容"
    description = "基于主题生成内容"
    variables = mutableMapOf(
        "topic" to variable("$.input.topic"),
        "style" to variable("$.input.style")
    )
}
```

### 函数步骤 ✅

执行自定义函数：

```kotlin
functionStep {
    id = "process-data"
    name = "处理数据"
    description = "处理来自前面步骤的数据"
    after("collect-data")
    function { context ->
        val data = context.resolveReference(variable("$.steps.collect-data.output.data"))
        // 处理数据
        mapOf("processedData" to processData(data))
    }
}
```

### 条件步骤 ✅

基于条件执行：

```kotlin
step(agent) {
    id = "optional-step"
    name = "可选步骤"
    description = "此步骤仅在特定条件下执行"
    after("previous-step")
    condition { context ->
        val quality = context.resolveReference(variable("$.steps.previous-step.output.quality"))
        (quality as? Int) ?: 0 > 7
    }
    variables = mutableMapOf(
        "input" to variable("$.steps.previous-step.output.text")
    )
}
```

### 子工作流步骤 ✅

将另一个工作流作为步骤执行：

```kotlin
subworkflowStep {
    id = "nested-workflow"
    name = "嵌套工作流"
    description = "执行嵌套工作流"
    workflowId = "another-workflow"
    inputMapping { context ->
        mapOf(
            "data" to context.resolveReference(variable("$.steps.previous-step.output.data"))
        )
    }
}
```

## 数据流 ✅

数据通过变量引用在工作流中流动：

### 变量引用 ✅

变量引用使用类似 JSON 路径的语法访问数据：

```kotlin
// 引用工作流输入
val topicRef = variable("$.input.topic")

// 引用前一步骤的输出
val researchRef = variable("$.steps.research.output.text")

// 引用全局变量
val configRef = variable("$.variables.config")
```

### 输出映射 ✅

输出映射定义如何提取和转换最终工作流输出：

```kotlin
output {
    // 直接映射
    "researchResults" from "$.steps.research.output.text"
    
    // 带转换的映射
    "wordCount" from {
        "$.steps.writing.output.text" to { text ->
            (text as? String)?.split(" ")?.size ?: 0
        }
    }
    
    // 组合多个来源
    "summary" from {
        "$.steps.research.output.text" to { research ->
            "$.steps.writing.output.text" to { article ->
                "研究: $research\n\n文章: $article"
            }
        }
    }
}
```

## 错误处理 ✅

工作流可以通过几种方式处理错误：

### 重试配置 ✅

```kotlin
step(agent) {
    id = "unreliable-step"
    name = "不可靠步骤"
    description = "可能失败的步骤"
    
    // 配置重试
    config = StepConfig(
        retryConfig = RetryConfig(
            maxRetries = 3,
            retryDelay = 1000, // 1 秒
            backoffMultiplier = 2.0 // 指数退避
        )
    )
}
```

### 错误处理器 ✅

```kotlin
// 创建错误处理工作流引擎
val workflowEngine = ErrorHandlingWorkflowEngine(
    errorHandler = object : ErrorHandler {
        override suspend fun handleStepError(
            workflowId: String,
            runId: String,
            stepId: String,
            error: Throwable
        ): ErrorHandlingAction {
            return when {
                error is TimeoutException -> ErrorHandlingAction.Retry(maxRetries = 3)
                stepId == "optional-step" -> ErrorHandlingAction.Skip
                else -> ErrorHandlingAction.Fail
            }
        }
    }
)
```

## 工作流执行 ✅

工作流可以通过不同方式执行：

### 同步执行 ✅

```kotlin
val result = workflowEngine.executeWorkflow(
    workflowId = "my-workflow",
    input = mapOf("key" to "value")
)

println("工作流完成，输出: ${result.output}")
```

### 流式执行 ✅

```kotlin
workflowEngine.streamExecuteWorkflow(
    workflowId = "my-workflow",
    input = mapOf("key" to "value")
).collect { update ->
    when (update) {
        is WorkflowStatusUpdate.Started -> println("工作流已启动")
        is WorkflowStatusUpdate.StepStarted -> println("步骤已启动: ${update.stepId}")
        is WorkflowStatusUpdate.StepCompleted -> println("步骤已完成: ${update.stepId}")
        is WorkflowStatusUpdate.StepFailed -> println("步骤失败: ${update.stepId}, 错误: ${update.error}")
        is WorkflowStatusUpdate.Completed -> println("工作流已完成")
        is WorkflowStatusUpdate.Failed -> println("工作流失败: ${update.error}")
    }
}
```

### 执行选项 ✅

```kotlin
val options = WorkflowExecuteOptions(
    maxSteps = 20, // 最大执行步骤数
    timeout = 120000, // 2 分钟超时
    onStepFinish = { stepResult ->
        println("步骤 ${stepResult.stepId} 完成，输出: ${stepResult.output}")
    },
    onStepError = { stepId, error ->
        println("步骤 $stepId 失败，错误: ${error.message}")
    },
    threadId = "conversation-123" // 可选的线程 ID，用于记忆集成
)

val result = workflowEngine.executeWorkflow(
    workflowId = "my-workflow",
    input = mapOf("key" to "value"),
    options = options
)
```

## 工作流状态管理 ✅

工作流在执行期间和之后维护状态：

### 状态存储 ✅

```kotlin
// 内存状态存储（默认）
val inMemoryStorage = InMemoryWorkflowStateStorage()

// 基于文件的状态存储
val fileStorage = FileWorkflowStateStorage("workflows/state")

// 数据库状态存储
val dbStorage = DatabaseWorkflowStateStorage(dataSource)

// 使用自定义状态存储创建工作流引擎
val workflowEngine = WorkflowEngine(stateStorage = fileStorage)
```

### 状态查询 ✅

```kotlin
// 获取工作流状态
val state = workflowEngine.getWorkflowState("my-workflow", "run-123")

// 获取工作流的所有运行
val runs = workflowEngine.getWorkflowRuns("my-workflow")

// 获取活动运行
val activeRuns = workflowEngine.getActiveWorkflowRuns()
```

## 高级工作流功能 ✅

### 并行执行 ✅

```kotlin
val parallelWorkflow = workflow("parallel-workflow", "并行执行步骤") {
    // 这些步骤没有依赖关系，所以可以并行运行
    step(agent1) {
        id = "step1"
        name = "步骤 1"
        description = "第一个并行步骤"
    }
    
    step(agent2) {
        id = "step2"
        name = "步骤 2"
        description = "第二个并行步骤"
    }
    
    // 此步骤依赖于两个并行步骤
    step(agent3) {
        id = "step3"
        name = "步骤 3"
        description = "并行执行后的最终步骤"
        after("step1", "step2")
        variables = mutableMapOf(
            "result1" to variable("$.steps.step1.output.result"),
            "result2" to variable("$.steps.step2.output.result")
        )
    }
}
```

### 工作流组合 ✅

```kotlin
// 创建工作流组合器
val composer = WorkflowComposer("my-composer", workflowEngine)

// 顺序组合工作流
val sequentialWorkflow = composer.composeSequential(
    workflowName = "sequential-workflow",
    description = "按顺序执行工作流",
    workflows = listOf(
        "workflow1" to { input -> input }, // workflow1 的输入映射
        "workflow2" to { input, prevOutput -> prevOutput } // 使用前一个输出作为输入
    )
)

// 并行组合工作流
val parallelWorkflow = composer.composeParallel(
    workflowName = "parallel-workflow",
    description = "并行执行工作流",
    workflows = mapOf(
        "branch1" to "workflow1",
        "branch2" to "workflow2"
    ),
    inputMapping = { branch, input ->
        // 为每个分支映射输入
        when (branch) {
            "branch1" -> mapOf("key1" to input["value"])
            "branch2" -> mapOf("key2" to input["value"])
            else -> emptyMap()
        }
    },
    mergeStep = MergeResultsStep(
        id = "merge",
        name = "合并结果",
        description = "合并来自并行工作流的结果"
    )
)
```

### 状态机 ✅

对于具有多个可能路径的复杂工作流，您可以使用状态机：

```kotlin
// 定义状态
enum class OrderState {
    NEW, PROCESSING, SHIPPED, DELIVERED, CANCELED
}

// 定义事件
sealed class OrderEvent {
    data class PlaceOrder(val items: List<String>) : OrderEvent()
    data class ProcessOrder(val paymentId: String) : OrderEvent()
    data class ShipOrder(val trackingNumber: String) : OrderEvent()
    data class DeliverOrder(val deliveryDate: String) : OrderEvent()
    object CancelOrder : OrderEvent()
}

// 创建状态机
val orderStateMachine = BasicStateMachine<OrderState, OrderEvent, Map<String, Any>>(
    initialState = OrderState.NEW,
    initialContext = emptyMap(),
    transitioner = object : StateTransitioner<OrderState, OrderEvent, Map<String, Any>> {
        override suspend fun transition(
            currentState: OrderState,
            event: OrderEvent,
            context: Map<String, Any>
        ): TransitionResult<OrderState, Map<String, Any>> {
            // 定义状态转换
            val (nextState, updatedContext) = when (currentState to event) {
                OrderState.NEW to is OrderEvent.PlaceOrder -> {
                    OrderState.PROCESSING to context + ("items" to event.items)
                }
                OrderState.PROCESSING to is OrderEvent.ProcessOrder -> {
                    OrderState.SHIPPED to context + ("paymentId" to event.paymentId)
                }
                OrderState.SHIPPED to is OrderEvent.ShipOrder -> {
                    OrderState.DELIVERED to context + ("trackingNumber" to event.trackingNumber)
                }
                OrderState.SHIPPED to is OrderEvent.DeliverOrder -> {
                    OrderState.DELIVERED to context + ("deliveryDate" to event.deliveryDate)
                }
                else to is OrderEvent.CancelOrder -> {
                    OrderState.CANCELED to context
                }
                else -> currentState to context
            }
            
            return TransitionResult(
                nextState = nextState,
                updatedContext = updatedContext,
                sideEffects = emptyList()
            )
        }
    }
)

// 使用状态机
val nextState = orderStateMachine.sendEvent(OrderEvent.PlaceOrder(listOf("item1", "item2")))
println("下一个状态: $nextState")
```

## 完整示例 ✅

以下是一个复杂工作流的完整示例：

```kotlin
import ai.kastrax.core.workflow.builder.workflow
import ai.kastrax.core.workflow.engine.WorkflowEngine
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    // 创建代理
    val researchAgent = agent {
        name("研究代理")
        description("一个研究主题的代理")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val writingAgent = agent {
        name("写作代理")
        description("一个基于研究写内容的代理")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val editingAgent = agent {
        name("编辑代理")
        description("一个编辑和改进内容的代理")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    val factCheckingAgent = agent {
        name("事实核查代理")
        description("一个验证内容中事实的代理")
        
        model = deepSeek {
            apiKey("your-deepseek-api-key")
            model(DeepSeekModel.DEEPSEEK_CHAT)
            temperature(0.7)
        }
    }
    
    // 创建内容创建工作流
    val contentWorkflow = workflow("content-creation", "创建经过研究和编辑的内容") {
        // 研究步骤
        step(researchAgent) {
            id = "research"
            name = "研究"
            description = "彻底研究主题"
            variables = mutableMapOf(
                "topic" to variable("$.input.topic"),
                "depth" to variable("$.input.researchDepth")
            )
        }
        
        // 写作步骤
        step(writingAgent) {
            id = "writing"
            name = "写作"
            description = "基于研究写内容"
            after("research")
            variables = mutableMapOf(
                "research" to variable("$.steps.research.output.text"),
                "style" to variable("$.input.contentStyle"),
                "length" to variable("$.input.contentLength")
            )
        }
        
        // 并行事实核查和编辑
        step(factCheckingAgent) {
            id = "fact-checking"
            name = "事实核查"
            description = "验证内容中的事实"
            after("writing")
            variables = mutableMapOf(
                "content" to variable("$.steps.writing.output.text"),
                "research" to variable("$.steps.research.output.text")
            )
        }
        
        step(editingAgent) {
            id = "editing"
            name = "编辑"
            description = "编辑和改进内容"
            after("writing")
            variables = mutableMapOf(
                "content" to variable("$.steps.writing.output.text"),
                "style" to variable("$.input.contentStyle")
            )
        }
        
        // 最终修订步骤
        step(editingAgent) {
            id = "final-revision"
            name = "最终修订"
            description = "创建包含编辑和事实核查的最终版本"
            after("fact-checking", "editing")
            variables = mutableMapOf(
                "original" to variable("$.steps.writing.output.text"),
                "edits" to variable("$.steps.editing.output.text"),
                "factChecks" to variable("$.steps.fact-checking.output.text")
            )
        }
        
        // 条件质量检查步骤
        step(editingAgent) {
            id = "quality-check"
            name = "质量检查"
            description = "如果请求，执行质量检查"
            after("final-revision")
            condition { context ->
                val performQualityCheck = context.resolveReference(variable("$.input.performQualityCheck"))
                (performQualityCheck as? Boolean) ?: false
            }
            variables = mutableMapOf(
                "content" to variable("$.steps.final-revision.output.text")
            )
        }
        
        // 定义输出映射
        output {
            "research" from "$.steps.research.output.text"
            "draft" from "$.steps.writing.output.text"
            "factChecks" from "$.steps.fact-checking.output.text"
            "edits" from "$.steps.editing.output.text"
            "finalContent" from "$.steps.final-revision.output.text"
            "qualityScore" from {
                "$.steps.quality-check.output.score" to { score ->
                    score ?: "未执行质量检查"
                }
            }
            "metadata" from {
                "$.steps.writing.output.wordCount" to { wordCount ->
                    "$.steps.final-revision.output.wordCount" to { finalWordCount ->
                        mapOf(
                            "initialWordCount" to wordCount,
                            "finalWordCount" to finalWordCount,
                            "changePercentage" to calculatePercentage(wordCount, finalWordCount)
                        )
                    }
                }
            }
        }
    }
    
    // 创建工作流引擎
    val workflowEngine = WorkflowEngine()
    workflowEngine.registerWorkflow("content-creation", contentWorkflow)
    
    // 执行工作流
    val result = workflowEngine.executeWorkflow(
        workflowId = "content-creation",
        input = mapOf(
            "topic" to "人工智能伦理",
            "researchDepth" to "全面",
            "contentStyle" to "学术",
            "contentLength" to "2000 字",
            "performQualityCheck" to true
        )
    )
    
    // 打印结果
    println("工作流完成，状态: ${if (result.success) "成功" else "失败"}")
    println("最终内容: ${result.output["finalContent"]}")
    println("质量分数: ${result.output["qualityScore"]}")
    println("元数据: ${result.output["metadata"]}")
}

// 计算百分比变化的辅助函数
fun calculatePercentage(initial: Any?, final: Any?): Double {
    val initialValue = (initial as? Number)?.toDouble() ?: return 0.0
    val finalValue = (final as? Number)?.toDouble() ?: return 0.0
    
    if (initialValue == 0.0) return 0.0
    return ((finalValue - initialValue) / initialValue) * 100.0
}
```

## 最佳实践 ✅

1. **步骤粒度**：设计适当粒度的步骤 - 不要太小以至于创建开销，不要太大以至于失去灵活性
2. **错误处理**：为不可靠操作实现具有重试功能的强大错误处理
3. **数据流**：使用变量引用明确步骤之间的数据流
4. **条件逻辑**：使用条件创建能够适应中间结果的动态工作流
5. **监控**：为工作流执行设置适当的监控和日志记录
6. **状态管理**：根据您的可靠性要求选择适当的状态存储
7. **测试**：使用不同的输入和边缘情况测试工作流
8. **组合**：从更简单、可重用的工作流组合复杂工作流

## 下一步 ✅

现在您已经了解了工作流系统，您可以：

1. 了解[工作流定义](./workflow-definition.mdx)
2. 探索[工作流执行](./workflow-execution.mdx)
3. 实现[代理与工作流的集成](./agent-integration.mdx)
4. 设置[工作流状态管理](./state-management.mdx)
